{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 切分模型"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在实际生产环境中有很多需求需要将模型切分成多个部分，比如[ESSM](https://arxiv.org/pdf/1804.07931.pdf)模型中有两个模型联合训练，在这种情况下为了满足在线预估的需求，需要将这两个模型单独保存，在线逻辑会在不同的server中加载并inference。\n",
    "\n",
    "将模型切分后也非常有助于简化在线推理的工作，下面我们通过wide_deep模型来看在TensorFlow中如何切分模型，并与TensorNet结合使用。这部分代码放在[examples/wide_deep.py](../../examples/wide_deep.py)，可以直接执行测试。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面这部分代码与[quick start with wide deep](01-begin-with-wide-deep.ipynb)展示的一致。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "sys.path.append('/da2/zhangyansheng/tensornet') # 在此设置您的tensornet包的位置"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from datetime import datetime\n",
    "\n",
    "import tensorflow as tf\n",
    "import tensornet as tn\n",
    "import numpy as np\n",
    "\n",
    "\n",
    "class Config(object):\n",
    "    FILE_MATCH_PATTERN = \"tf-*\"\n",
    "    BATCH_SIZE = 32\n",
    "    DEEP_HIDDEN_UNITS = [512, 256, 256]\n",
    "\n",
    "    WIDE_SLOTS = [ \"1\",\"2\",\"3\",\"4\"]\n",
    "    DEEP_SLOTS = [ \"1\",\"2\",\"3\",\"4\"]\n",
    "\n",
    "C = Config\n",
    "\n",
    "def columns_builder():\n",
    "    \"\"\"Builds a set of wide and deep feature columns.\"\"\"\n",
    "\n",
    "    columns = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        columns[slot] = tn.feature_column.category_column(key=slot)\n",
    "\n",
    "    wide_columns = []\n",
    "    for slot in C.WIDE_SLOTS:\n",
    "        feature_column = tf.feature_column.embedding_column(columns[slot], dimension=1)\n",
    "\n",
    "        wide_columns.append(feature_column)\n",
    "\n",
    "    deep_columns = []\n",
    "    for slot in C.DEEP_SLOTS:\n",
    "        feature_column = tf.feature_column.embedding_column(columns[slot], dimension=8)\n",
    "        deep_columns.append(feature_column)\n",
    "\n",
    "    return wide_columns, deep_columns\n",
    "\n",
    "\n",
    "def parse_line_batch(example_proto):\n",
    "    fea_desc = {\n",
    "        \"label\": tf.io.FixedLenFeature([], tf.int64)\n",
    "    }\n",
    "\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        fea_desc[slot]  = tf.io.VarLenFeature(tf.int64)\n",
    "\n",
    "    feature_dict = tf.io.parse_example(example_proto, fea_desc)\n",
    "\n",
    "    # [batch_size, label]\n",
    "    label = feature_dict.pop('label')\n",
    "\n",
    "    return feature_dict, label\n",
    "\n",
    "\n",
    "def read_dataset(data_path, days, match_pattern, num_parallel_calls = 12):\n",
    "    ds_data_files = tn.data.list_files(data_path, days=days, match_pattern=match_pattern)\n",
    "    dataset = ds_data_files.shard(num_shards=tn.core.shard_num(), index=tn.core.self_shard_id())\n",
    "    dataset = dataset.interleave(lambda f: tf.data.TFRecordDataset(f, buffer_size=1024 * 100),\n",
    "                                       cycle_length=4, block_length=8,\n",
    "                                       num_parallel_calls=tf.data.experimental.AUTOTUNE)\n",
    "    dataset = dataset.batch(C.BATCH_SIZE)\n",
    "    dataset = dataset.map(map_func=parse_line_batch, num_parallel_calls=num_parallel_calls)\n",
    "    dataset = tn.data.BalanceDataset(dataset)\n",
    "    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)\n",
    "\n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 创建切分版的模型"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### embedding model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们将获取embedding的部分切分到单独的model里去，在线inference的时候不需要加载这部分。在线逻辑实现中只要将TensorNet保存的sparse embedding数据导出成字典，然后根据每一条请求查出相应的embedding数据只会塞到我们下面提到的另外一个模型里去即可。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_emb_model(wide_columns, deep_columns):\n",
    "    wide_embs, deep_embs = [], []\n",
    "\n",
    "    inputs = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype=\"int64\", sparse=True)\n",
    "\n",
    "    sparse_opt = tn.core.AdaGrad(learning_rate=0.01, initial_g2sum=0.1, initial_scale=0.1)\n",
    "\n",
    "    if wide_columns:\n",
    "        wide_embs = tn.layers.EmbeddingFeatures(wide_columns, sparse_opt, name='wide_inputs')(inputs)\n",
    "\n",
    "    if deep_columns:\n",
    "        deep_embs = tn.layers.EmbeddingFeatures(deep_columns, sparse_opt, name='deep_inputs')(inputs)\n",
    "\n",
    "    # must put wide embs at front of outputs list\n",
    "    emb_model = tn.model.Model(inputs=inputs, outputs=[wide_embs, deep_embs], name=\"emb_model\")\n",
    "\n",
    "    return emb_model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### online inference model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "inference model的输入是上个模型的输出，这个模型是我们在线预估真正用到的模型。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_sub_model(wide_emb_input_shapes, deep_emb_input_shapes):\n",
    "    wide, deep = None, None\n",
    "\n",
    "    wide_inputs = [tf.keras.layers.Input(name=\"wide_emb_{}\".format(i), dtype=\"float32\", shape=shape[1:])\n",
    "                    for i, shape in enumerate(wide_emb_input_shapes)]\n",
    "\n",
    "    deep_inputs = [tf.keras.layers.Input(name=\"deep_emb_{}\".format(i), dtype=\"float32\", shape=shape[1:])\n",
    "                    for i, shape in enumerate(deep_emb_input_shapes)]\n",
    "\n",
    "    if wide_inputs:\n",
    "        wide = tf.keras.layers.Concatenate(name='wide_concact', axis=-1)(wide_inputs)\n",
    "\n",
    "    if deep_inputs:\n",
    "        deep = tf.keras.layers.Concatenate(name='deep_concact', axis=-1)(deep_inputs)\n",
    "\n",
    "        for i, unit in enumerate(C.DEEP_HIDDEN_UNITS):\n",
    "            deep = tf.keras.layers.Dense(unit, activation='relu', name='dnn_{}'.format(i))(deep)\n",
    "\n",
    "    if wide_inputs and not deep_inputs:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(wide)\n",
    "    elif deep_inputs and not wide_inputs:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep)\n",
    "    else:\n",
    "        both = tf.keras.layers.concatenate([deep, wide], name='both')\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)\n",
    "\n",
    "    model = tn.model.Model(inputs=[wide_inputs, deep_inputs], outputs=output, name=\"sub_model\")\n",
    "\n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面是整体模型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_model(wide_columns, deep_columns):\n",
    "    inputs = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype=\"int64\", sparse=True)\n",
    "\n",
    "    emb_model = create_emb_model(wide_columns, deep_columns)\n",
    "\n",
    "    assert len(emb_model.output) == 2, \"expected emb_model output length is 2 but {}\".format(emb_model.output)\n",
    "    wide_emb_input_shapes = [emb.shape for emb in emb_model.output[0]]\n",
    "    deep_emb_input_shapes = [emb.shape for emb in emb_model.output[1]]\n",
    "\n",
    "    wide_embs, deep_embs = emb_model(inputs)\n",
    "    sub_model = create_sub_model(wide_emb_input_shapes, deep_emb_input_shapes)\n",
    "    output = sub_model([wide_embs, deep_embs])\n",
    "    model = tn.model.Model(inputs=inputs, outputs=output, name=\"full_model\")\n",
    "\n",
    "    dense_opt = tn.core.Adam(learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8)\n",
    "    model.compile(optimizer=tn.optimizer.Optimizer(dense_opt),\n",
    "                  loss='binary_crossentropy',\n",
    "                  metrics=['acc', \"mse\", \"mae\", 'mape', tf.keras.metrics.AUC(),\n",
    "                           tn.metric.CTR(), tn.metric.PCTR(), tn.metric.COPC()])\n",
    "\n",
    "    return model, sub_model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "测试数据与[quick start with wide deep](01-begin-with-wide-deep.ipynb)相同"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "TEST_DATA_PATH = \"/tmp/wide-deep-test/data\"\n",
    "MODEL_DIR = '/tmp/wide-deep-test/model'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "注意下面代码中我们只将`sub_model`保存成了TensorFlow的`SavedModel`格式，对于其它的所有参数会在`tn.callbacks.PsWeightCheckpoint`会自动保存。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "750/750 [==============================] - 7s 9ms/step - loss: 0.6933 - acc: 0.5009 - mse: 0.2501 - mae: 0.5000 - mape: 250785184.0000 - auc: 0.5009 - CTR: 0.4948 - PCTR: 0.4964 - COPC: 0.9968\n",
      "WARNING:tensorflow:From /da2/zhangyansheng/tf_package/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py:1817: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "If using Keras pass *_constraint arguments to layers.\n",
      "INFO:tensorflow:Assets written to: /tmp/wide-deep-test/model/saved_model/assets\n"
     ]
    }
   ],
   "source": [
    "def main():\n",
    "    strategy = tn.distribute.PsStrategy()\n",
    "\n",
    "    with strategy.scope():\n",
    "        wide_column, deep_column = columns_builder()\n",
    "        model, sub_model = create_model(wide_column, deep_column)\n",
    "\n",
    "        train_dataset = read_dataset(TEST_DATA_PATH, [''], C.FILE_MATCH_PATTERN)\n",
    "\n",
    "        cp_cb = tn.callbacks.PsWeightCheckpoint(MODEL_DIR, need_save_model=True, dt=\"\")\n",
    "        model.fit(train_dataset, epochs=1, verbose=1, callbacks=[cp_cb])\n",
    "\n",
    "        infer_batch_size = 100\n",
    "        for tensor in sub_model.inputs:\n",
    "            tensor.set_shape([infer_batch_size] + list(tensor.shape)[1:])\n",
    "\n",
    "        sub_model.save(MODEL_DIR + '/saved_model')\n",
    "    return\n",
    "\n",
    "main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "保存的文件目录如下"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/tmp/wide-deep-test/model\r\n",
      "├── checkpoint\r\n",
      "├── _checkpoint\r\n",
      "├── dense_table\r\n",
      "│   └── 0\r\n",
      "│       └── 0\r\n",
      "├── saved_model\r\n",
      "│   ├── assets\r\n",
      "│   ├── saved_model.pb\r\n",
      "│   └── variables\r\n",
      "│       ├── variables.data-00000-of-00001\r\n",
      "│       └── variables.index\r\n",
      "├── sparse_table\r\n",
      "│   ├── 0\r\n",
      "│   │   └── rank_0\r\n",
      "│   │       ├── sparse_block_0.gz\r\n",
      "│   │       ├── sparse_block_1.gz\r\n",
      "│   │       ├── sparse_block_2.gz\r\n",
      "│   │       ├── sparse_block_3.gz\r\n",
      "│   │       ├── sparse_block_4.gz\r\n",
      "│   │       ├── sparse_block_5.gz\r\n",
      "│   │       ├── sparse_block_6.gz\r\n",
      "│   │       └── sparse_block_7.gz\r\n",
      "│   └── 1\r\n",
      "│       └── rank_0\r\n",
      "│           ├── sparse_block_0.gz\r\n",
      "│           ├── sparse_block_1.gz\r\n",
      "│           ├── sparse_block_2.gz\r\n",
      "│           ├── sparse_block_3.gz\r\n",
      "│           ├── sparse_block_4.gz\r\n",
      "│           ├── sparse_block_5.gz\r\n",
      "│           ├── sparse_block_6.gz\r\n",
      "│           └── sparse_block_7.gz\r\n",
      "├── tf_checkpoint.data-00000-of-00001\r\n",
      "└── tf_checkpoint.index\r\n",
      "\r\n",
      "10 directories, 24 files\r\n"
     ]
    }
   ],
   "source": [
    "! tree $MODEL_DIR"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "其中，我们不仅保存了TensorFlow标准的checkpoint，还在其之上保存了TensorNet自身的checkpoint，`dense_table`保存了所有需要同步的非`sparse`参数，这个普通开发者可以不用关注，普通开发者只需要关注`sparse_table`目录即可。在[05-export-sparse-feature-embedding.ipynb](./05-export-sparse-feature-embedding.ipynb)一节中我们将介绍如何将sparse的特征数据导出成字典，以供在线使用。在[04-deploy-tf-graph-online.ipynb](./04-deploy-tf-graph-online.ipynb)中我们提供了一个在线预估的例子。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 总结"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在上面的例子中我们拿TensorNet在线预估的场景说明的切分模型的办法，可以做到将sparse embedding相关模型与在线预估模型分离，从而减少在线预估的开销。相对于其它类型的模型其实也是一个思路，当切分完成之后按照具体需求保存即可。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
